package main

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
)

func main() {
	// Kafka broker addresses
	brokers := []string{"localhost:9092"}

	// Topic and partition you want to consume from
	topic := "my-topic"
	partition := 2 // For example, partition 0

	// Create a new kafka.Reader instance for the specified partition
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   brokers,
		Topic:     topic,
		Partition: partition, // Notice 注意：指定了partition就不要再指定 GroupID 了!!!
	})
	defer r.Close()

	for {
		// Read a message from the partition
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			log.Printf("error while reading message: %v", err)
			break
		}

		// Process the message
		// TODO 做一些业务处理...

		fmt.Printf("message at partition: %v, offset %d: %s = %s\n", m.Partition, m.Offset, string(m.Key), string(m.Value))
	}
}
